{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3>Client</h3>\n",
       "<ul>\n",
       "  <li><b>Scheduler: </b>tcp://127.0.0.1:45838\n",
       "  <li><b>Dashboard: </b><a href='http://127.0.0.1:8787' target='_blank'>http://127.0.0.1:8787</a>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3>Cluster</h3>\n",
       "<ul>\n",
       "  <li><b>Workers: </b>4</li>\n",
       "  <li><b>Cores: </b>4</li>\n",
       "  <li><b>Memory: </b>10.00 GB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: scheduler='tcp://127.0.0.1:45838' processes=4 cores=4>"
      ]
     },
     "execution_count": 48,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os\n",
    "import numpy as np\n",
    "import scipy.sparse as sp\n",
    "import pandas as pd\n",
    "from glob import glob\n",
    "\n",
    "import dask\n",
    "import dask.bag as db\n",
    "import joblib\n",
    "\n",
    "from distributed import Client\n",
    "client = Client()\n",
    "client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "rm -rf sparse_chunks/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "folder = 'sparse_chunks'\n",
    "n_features = int(1e5)\n",
    "n_informative = int(1e4)\n",
    "\n",
    "n_chunks = int(1e1)\n",
    "chunk_size = int(1e2)\n",
    "\n",
    "rng = np.random.RandomState(42)\n",
    "true_coef = rng.randn(n_features)\n",
    "true_coef[n_informative:] = 0\n",
    "\n",
    "\n",
    "def make_chunk(n_samples, true_coef, chunk_idx, format='csr',\n",
    "               density=1e-3, noise=1e-1):\n",
    "    rng = np.random.RandomState(chunk_idx)\n",
    "    n_features = true_coef.shape[0]\n",
    "    input_data = sp.rand(n_samples, n_features, format=format,\n",
    "                         density=density, random_state=rng)\n",
    "    noise = rng.normal(loc=0, scale=noise, size=n_samples)\n",
    "    target = input_data.dot(true_coef).ravel() + noise\n",
    "    return chunk_idx, input_data, (target > 0).astype(np.int32)\n",
    "\n",
    "\n",
    "def save_to_disk(chunk_idx, X, y, folder='sparse_chunks'):\n",
    "    os.makedirs(folder, exist_ok=True)\n",
    "    filename = \"sparse_chunk_{:04d}.pkl\".format(chunk_idx)\n",
    "    joblib.dump((X, y), os.path.join(folder, filename))\n",
    "    return filename\n",
    "\n",
    "\n",
    "def load_from_disk(chunk_idx, filename):\n",
    "    X, y = joblib.load(filename)\n",
    "    return chunk_idx, X, y"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Lazy loading chunks from sparse_chunks\n"
     ]
    }
   ],
   "source": [
    "if not os.path.exists(folder):\n",
    "    print(\"Generating chunks of sparse data into\", folder)\n",
    "    b = db.from_sequence([(chunk_size, true_coef, i)\n",
    "                          for i in range(n_chunks)])\n",
    "    b = b.starmap(make_chunk).starmap(save_to_disk).compute()\n",
    "\n",
    "\n",
    "    \n",
    "print(\"Lazy loading chunks from\", folder)\n",
    "b = db.from_sequence(enumerate(sorted(glob('sparse_chunks/*.pkl'))))\n",
    "b = b.starmap(load_from_disk)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 50,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 8 ms, sys: 0 ns, total: 8 ms\n",
      "Wall time: 9.53 ms\n"
     ]
    }
   ],
   "source": [
    "%time b = b.persist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 51,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "10"
      ]
     },
     "execution_count": 51,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(b.compute())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 16 ms, sys: 4 ms, total: 20 ms\n",
      "Wall time: 25.8 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "chunk_idx, X_0, y_0 = b.take(1)[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0"
      ]
     },
     "execution_count": 53,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "chunk_idx"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<100x100000 sparse matrix of type '<class 'numpy.float64'>'\n",
       "\twith 10000 stored elements in Compressed Sparse Row format>"
      ]
     },
     "execution_count": 54,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "X_0"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.97999999999999998"
      ]
     },
     "execution_count": 55,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "np.mean((X_0.dot(true_coef).ravel() > 0) == y_0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## L1-penalized Logistic Regression with SGD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 760 ms, sys: 64 ms, total: 824 ms\n",
      "Wall time: 2.75 s\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "(0.53333333333333333, 0.032998316455372205, 0.46405999999999997)"
      ]
     },
     "execution_count": 56,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from sklearn.linear_model import SGDClassifier\n",
    "from sklearn.model_selection import train_test_split\n",
    "from dask import delayed\n",
    "\n",
    "CLASSES = np.array([0, 1])\n",
    "\n",
    "\n",
    "def scan_fit(model, chunk):\n",
    "    return model.partial_fit(*chunk, classes=CLASSES)\n",
    "\n",
    "\n",
    "def score(model, chunk):\n",
    "    return model.score(*chunk)\n",
    "\n",
    "\n",
    "all_filenames = sorted(glob('sparse_chunks/*.pkl'))\n",
    "train_filenames, test_filenames = train_test_split(\n",
    "    all_filenames, random_state=0)\n",
    "\n",
    "model = SGDClassifier(loss='log', alpha=1e-3, penalty='elasticnet', tol=0)\n",
    "\n",
    "for i in range(20):\n",
    "    for filename in train_filenames:\n",
    "        chunk = delayed(joblib.load)(filename)\n",
    "        model = delayed(scan_fit)(model, chunk)\n",
    "\n",
    "\n",
    "scores = [delayed(score)(model, delayed(joblib.load)(filename))\n",
    "          for filename in test_filenames]\n",
    "    \n",
    "%time scores, model = dask.compute(scores, model)\n",
    "np.mean(scores), np.std(scores), np.mean(model.coef_ != 0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {},
   "outputs": [
    {
     "ename": "TypeError",
     "evalue": "'Future' object is not iterable",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mTypeError\u001b[0m                                 Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-57-4e7666067cab>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m      5\u001b[0m     \u001b[0;32mreturn\u001b[0m \u001b[0mmodel\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpartial_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mX\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mclasses\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      6\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 7\u001b[0;31m \u001b[0mb\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mscan_fit\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmodel\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mto_delayed\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdask\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[0;32m~/code/dask/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m     96\u001b[0m             \u001b[0mExtra\u001b[0m \u001b[0mkeywords\u001b[0m \u001b[0mto\u001b[0m \u001b[0mforward\u001b[0m \u001b[0mto\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mscheduler\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m`\u001b[0m\u001b[0;31m`\u001b[0m\u001b[0mget\u001b[0m\u001b[0;31m`\u001b[0m\u001b[0;31m`\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     97\u001b[0m         \"\"\"\n\u001b[0;32m---> 98\u001b[0;31m         \u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraverse\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     99\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    100\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m    203\u001b[0m     \u001b[0mdsk\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcollections_to_dsk\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mvariables\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moptimize_graph\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    204\u001b[0m     \u001b[0mkeys\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mvar\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_keys\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mvar\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mvariables\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 205\u001b[0;31m     \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    206\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    207\u001b[0m     \u001b[0mresults_iter\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0miter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresults\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mget_sync\u001b[0;34m(dsk, keys, **kwargs)\u001b[0m\n\u001b[1;32m    560\u001b[0m     \"\"\"\n\u001b[1;32m    561\u001b[0m     \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'num_workers'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m    \u001b[0;31m# if num_workers present, remove it\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 562\u001b[0;31m     \u001b[0;32mreturn\u001b[0m \u001b[0mget_async\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mapply_sync\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    563\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    564\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mget_async\u001b[0;34m(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)\u001b[0m\n\u001b[1;32m    506\u001b[0m             \u001b[0;31m# Seed initial tasks into the thread pool\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    507\u001b[0m             \u001b[0;32mwhile\u001b[0m \u001b[0mstate\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'ready'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstate\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'running'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m<\u001b[0m \u001b[0mnum_workers\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 508\u001b[0;31m                 \u001b[0mfire_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    509\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    510\u001b[0m             \u001b[0;31m# Main loop, wait on tasks to finish, insert new ones\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mfire_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m    502\u001b[0m                             args=(key, dumps((dsk[key], data)),\n\u001b[1;32m    503\u001b[0m                                   dumps, loads, get_id, pack_exception),\n\u001b[0;32m--> 504\u001b[0;31m                             callback=queue.put)\n\u001b[0m\u001b[1;32m    505\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    506\u001b[0m             \u001b[0;31m# Seed initial tasks into the thread pool\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mapply_sync\u001b[0;34m(func, args, kwds, callback)\u001b[0m\n\u001b[1;32m    549\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mapply_sync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwds\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m{\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcallback\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    550\u001b[0m     \u001b[0;34m\"\"\" A naive synchronous version of apply_async \"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 551\u001b[0;31m     \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwds\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    552\u001b[0m     \u001b[0;32mif\u001b[0m \u001b[0mcallback\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    553\u001b[0m         \u001b[0mcallback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mres\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mexecute_task\u001b[0;34m(key, task_info, dumps, loads, get_id, pack_exception)\u001b[0m\n\u001b[1;32m    293\u001b[0m         \u001b[0mfailed\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    294\u001b[0m     \u001b[0;32mexcept\u001b[0m \u001b[0mBaseException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 295\u001b[0;31m         \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mpack_exception\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdumps\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    296\u001b[0m         \u001b[0mfailed\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    297\u001b[0m     \u001b[0;32mreturn\u001b[0m \u001b[0mkey\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfailed\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mexecute_task\u001b[0;34m(key, task_info, dumps, loads, get_id, pack_exception)\u001b[0m\n\u001b[1;32m    288\u001b[0m     \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    289\u001b[0m         \u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdata\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mloads\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask_info\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 290\u001b[0;31m         \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    291\u001b[0m         \u001b[0mid\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget_id\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    292\u001b[0m         \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdumps\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mid\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36m_execute_task\u001b[0;34m(arg, cache, dsk)\u001b[0m\n\u001b[1;32m    269\u001b[0m         \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    270\u001b[0m         \u001b[0margs2\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0ma\u001b[0m \u001b[0;32min\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 271\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    272\u001b[0m     \u001b[0;32melif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mishashable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    273\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/code/dask/dask/bag/core.py\u001b[0m in \u001b[0;36maccumulate_part\u001b[0;34m(binop, seq, initial, is_first)\u001b[0m\n\u001b[1;32m   1273\u001b[0m         \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbinop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1274\u001b[0m     \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1275\u001b[0;31m         \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbinop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mseq\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0minitial\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m   1276\u001b[0m     \u001b[0;32mif\u001b[0m \u001b[0mis_first\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1277\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mres\u001b[0m \u001b[0;32melse\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/.virtualenvs/py36/lib/python3.6/site-packages/toolz/itertoolz.py\u001b[0m in \u001b[0;36maccumulate\u001b[0;34m(binop, seq, initial)\u001b[0m\n\u001b[1;32m     56\u001b[0m         \u001b[0mitertools\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maccumulate\u001b[0m \u001b[0;34m:\u001b[0m  \u001b[0mIn\u001b[0m \u001b[0mstandard\u001b[0m \u001b[0mitertools\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mPython\u001b[0m \u001b[0;36m3.2\u001b[0m\u001b[0;34m+\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     57\u001b[0m     \"\"\"\n\u001b[0;32m---> 58\u001b[0;31m     \u001b[0mseq\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0miter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     59\u001b[0m     \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mnext\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0minitial\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mno_default\u001b[0m \u001b[0;32melse\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     60\u001b[0m     \u001b[0;32myield\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mTypeError\u001b[0m: 'Future' object is not iterable"
     ]
    }
   ],
   "source": [
    "model = SGDClassifier(loss='log', penalty='l1', max_iter=1)\n",
    "\n",
    "def scan_fit(model, next_chunk):\n",
    "    chunk_idx, X, y = next_chunk\n",
    "    return model.partial_fit(X, y, classes=[0, 1])\n",
    "\n",
    "b.accumulate(scan_fit, initial=model).to_delayed()[-1].compute(get=dask.get)[0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
